package com.atguigu.gmall.realtime.app.function;


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.bean.TableProcess;
import com.atguigu.gmall.realtime.common.GmallConfig;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;


public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {


    private final OutputTag<JSONObject> objectOutputTag;
    private final MapStateDescriptor<String, TableProcess> mapStateDescriptor;
    private Connection connection;

    public TableProcessFunction(OutputTag<JSONObject> objectOutputTag, MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
        this.objectOutputTag = objectOutputTag;
        this.mapStateDescriptor = mapStateDescriptor;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName(GmallConfig.PHOENIX_DRIVER);
        connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
    }

    //value:{"db":"","tn":"","before":{},"after":{},"type":""}
    @Override
    public void processBroadcastElement(String value, BroadcastProcessFunction<JSONObject, String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
        //TODO 1、把string转为javabean
        //String ---》json
        JSONObject jsonObject = JSON.parseObject(value);
        String data = jsonObject.getString("after");
        //得到一个TableProcess对象
        TableProcess tableProcess = JSON.parseObject(data, TableProcess.class);
        // TODO 2、检查HBASE表是否纯在并建表
        if (TableProcess.SINK_TYPE_HBASE.equals(tableProcess.getSinkType())) {
            checkTable(tableProcess.getSinkTable(),
                    tableProcess.getSinkColumns(),
                    tableProcess.getSinkPk(),
                    tableProcess.getSinkExtend());
        }


        // TODO 3、写入状态，广播出去
        BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        String key = tableProcess.getSourceTable() + "-" + tableProcess.getOperateType();
        broadcastState.put(key, tableProcess);

    }

    //建表语句 : create table if not exists db.tn(id varchar primary key,tm_name varchar) xxx;
    private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {

        PreparedStatement preparedStatement = null;

        try {
            if (sinkPk == null) {
                sinkPk = "id";
            }
            if (sinkExtend == null) {
                sinkExtend = "";
            }

            StringBuilder createTableSQL = new StringBuilder("create table if not exists")
                    .append(GmallConfig.HBASE_SCHEMA)
                    .append(".")
                    .append(sinkTable)
                    .append("(");
            String[] fields = sinkColumns.split(",");

            for (String field : fields) {
                if (sinkPk.equals(field)) {
                    createTableSQL.append(field).append("varchar primary key");
                } else {
                    createTableSQL.append(field).append(" varchar ");
                }

            }
            createTableSQL.append(")").append(sinkExtend);


            preparedStatement = connection.prepareStatement(createTableSQL.toString());
            preparedStatement.execute();

        } catch (SQLException e) {
            throw new RuntimeException("Phoenix建表"+sinkTable+"建表失败！");
        }finally {
            if (preparedStatement!=null){
                try {
                    preparedStatement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void processElement(JSONObject value, BroadcastProcessFunction<JSONObject, String, JSONObject>.ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
        //TODO 1、读取广播的配置数据
        ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        String key = value.getString("tableName") + "-" + value.getString("type");
        TableProcess tableProcess = broadcastState.get(key);

        //TODO  2、过滤数据
        if (tableProcess != null) {
            JSONObject data = value.getJSONObject("after");
            filterColumn(data, tableProcess.getSinkColumns());
            // TODO 3、分流
            //将输出表/主题信息写入Value
            value.put("sinkTable", tableProcess.getSinkTable());
            String sinkTable = tableProcess.getSinkTable();
            if (TableProcess.SINK_TYPE_KAFKA.equals(sinkTable)) {
                //kafka写入主流
                out.collect(value);
            } else if (TableProcess.SINK_TYPE_HBASE.equals(sinkTable)) {
                //HBASE数据，写入侧输出流
                ctx.output(objectOutputTag, value);
            }

        } else {
            System.out.println("该组合Key：" + key + "不存在！");
        }
    }

    private void filterColumn(JSONObject data, String sinkColumns) {
        String[] fields = sinkColumns.split(",");
        List<String> columns = Arrays.asList(fields);

//        Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator();
//        while (iterator.hasNext()){
//            Map.Entry<String, Object> next = iterator.next();
//            if(!columns.contains(next.getKey())){
//                iterator.remove();
//            }
//        }

        data.entrySet().removeIf(next -> !columns.contains(next.getKey()));
    }


}
